Skip to main content

Spark Agent

Spark Agent JARs

The latest version is 0.60.5.

Download the relevant version:

# Latest
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-latest.jar

# Specific version
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar

Latest builds:

Configuration Options

Technical Parameters

NameDetails
spark.jarsURL of definity-spark-agent-X.X.jar (and optionally definity-spark-iceberg-1.2-X.X.jar)
spark.pluginsAdd ai.definity.spark.plugin.DefinitySparkPlugin (for Spark 3.x)
spark.extraListenersAdd ai.definity.spark.AppListener (for Spark 2.x)
spark.definity.serverDefinity server URL (e.g., https://app.definity.run)
spark.definity.api.tokenIntegration token (required for SaaS usage)

Applicative Parameters

NameDetails
spark.definity.env.nameEnvironment name; defaults to default
spark.definity.pipeline.namePipeline name; defaults to spark.app.name
spark.definity.pipeline.pitPoint-in-Time grouping for tasks; defaults to the current time
spark.definity.task.nameLogical task name, stable across runs; defaults to spark.app.name

Advanced

NameDetails
spark.definity.enabledEnables or disables functionality with options: true, false, or opt-in (default: true). For opt-in, users can toggle this in the pipeline settings page.
spark.definity.pipeline.run.idUsed for grouping tasks in the same run.
spark.definity.task.idUser defined task ID to show in the UI and notifications (e.g., YARN run ID); defaults to spark.app.name.
spark.definity.tagsComma-separated tags, supports key:value format (e.g., team:team-A).
spark.definity.email.toComma-separated list of notification recipient emails.
spark.definity.task.heartbeat.intervalInterval in seconds for sending heartbeat to the server; defaults to 60.
spark.definity.server.request.retry.countNumber of retries for server request errors; defaults to 1.
spark.definity.ignoredTablesComma-separated list of tables to ignore. Names can be full (e.g., db_a.table_a) or partial (e.g., table_a), which applies to all databases.
spark.definity.files.sanitizedNamePatternRegular expression to extract time partitions from file names. Defaults to ^.*?(?=/\d+/|/[^/]_=[^/]_/). Set empty to disable.
spark.definity.delta.enabledEnables Delta instrumentation; defaults to true. Set to false to opt-out.
spark.definity.inputs.maxPerQueryMaximum number of allowed inputs per query; defaults to 100.
spark.definity.default.session.enabledEnables default session for multi-concurrent SparkSession apps; defaults to true. Set to false to disable.
spark.definity.default.session.rotationSecondsMaximum duration in seconds for the default session before rotation; defaults to 3600.
spark.definity.metrics.injection.enabledEnable in flight data distribution metrics; defaults to false.
spark.definity.debugEnable debug logs; defaults to false.
spark.definity.databricks.automaticSessions.enabledEnable auto detection of tasks in Databricks multi-task workflows; defaults to false. defaults to true.
spark.definity.events.enabledFlag to enable reporting of events. defaults to true.
spark.definity.events.maxPerTaskRunMaximum number of events to report in one task. defaults to 5000.
spark.definity.slowPlanning.thresholdSecondsThreshold to decide when execution planning is too slow and trigger event. defaults to 60.
spark.definity.plugin.executor.enabledEnables executor side plugin when definity plugin is configured; defaults to true.

Metrics Calculation

NameDetails
spark.definity.num.threadsNumber of threads for metrics calculation; defaults to 2.
spark.definity.metrics.timeoutTimeout for metrics calculation, in seconds; defaults to 180.
spark.definity.metrics.histogram.maxNumValuesMaximum number of values for histogram distribution; defaults to 10.
spark.definity.metrics.executorsMetrics.enabledSpecifies whether to extract metrics from Spark's ExecutorMetricsUpdate event; defaults to true.
spark.definity.metrics.timeSeries.intervalSecondsTime-series metrics bucket size in seconds; defaults to 60.
spark.definity.driver.containerMemoryTotal container memory for the driver in bytes (for client mode).
spark.definity.driver.heapMemoryTotal heap memory for the driver in bytes (for client mode).

Custom Metrics

To report custom metrics, return two columns from your query:

  • definity_metric_name (string)
  • definity_metric_value (numeric)

Example:

spark.sql(
"select 'my_new_metric' as definity_metric_name, 1.5 as definity_metric_value"
).collect()

Output Diversion

Useful for CI shadow runs flows

NameDetails
spark.definity.output.table.suffixSuffix to add to all output tables
spark.definity.output.database.suffixSuffix to add to all output tables' database name
spark.definity.output.database.baseLocationBase location for all the created output databases
spark.definity.output.file.baseLocationBase location for output files. Either a full base location path, to divert all files to a single location regardless of their original location, or partial path to keep each in its own bucket but under a different base directory. e.g: - gs://my-tests-bucket, or my-tests-base-dir)

Skew events

Skew events are calculated in the executors and use Spark's plugins mechanism.

NameDetails
spark.definity.plugin.executor.driverPollingIntervalMsInterval in milliseconds between consecutive polling requests from executor to driver when using the Definity plugin; defaults to 20000.
spark.definity.skewDetection.minTaskSkewTimeSecondsMinimum difference in seconds between suspected skewed task duration and the average task duration in its stage; defaults to 60.
spark.definity.skewDetection.minTaskSkewFactorMinimum ratio between suspected skewed task duration and the average task duration in its stage; defaults to 2.
spark.definity.skewDetection.samplingRatioSampling ratio of task rows (e.g., 0.01 equals 1% sampling); defaults to 0.01.
spark.definity.skewDetection.maxSampledRowsPerTaskMaximum number of sampled rows per task; defaults to 1000.
spark.definity.skewDetection.maxReportedKeysPerTaskMaximum number of reported keys per task; defaults to 10.

Examples

PySpark

spark = (
SparkSession.builder.appName("demo_pyspark")
.config("spark.jars", "definity-spark-agent-X.X.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)

Airflow Integration

Using Jinja templating:

with DAG(
dag_id="spark_dag",
) as dag:
op1 = BashOperator(
task_id="spark_task",
bash_command="spark-submit ... \
--jars ...,definity-spark-agent-X.X.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ ts }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
...",
)